
pub mod util;

use once_cell::sync::OnceCell;
use std::collections::{HashMap, VecDeque};
use std::sync::{RwLock, RwLockWriteGuard, RwLockReadGuard, Arc};
use tokio::runtime::Runtime;
use serde::Serialize;
use crate::util::{AsyncSend, async_channel_new, AsyncRecv};
use std::ops::Deref;
use std::collections::hash_map::Iter;
use crate::conf_init::{EventFlowNodeConfInstance, MapResult, ResultErr};
use crate::logical_plan::work_node::util::ErrResult;
use crate::logical_plan::work_space::{WorkSpaceDebug, work_space_add_next, work_space_ctl_get};
use crate::run_plan::{RunPlan, HandResult, RunPlanInstance};
use crate::logical_plan::{logicalplan_map};
use crate::logical_plan::func::FuncInstance;
use crate::data_frame::RealValue;
use crate::logical_plan::expr::conf_func_map_logicalplan_expr;
use crate::logical_plan::filter::FilterInstance;
use std::ops::DerefMut;

/*
[[work_node]]
name: aaa
filter: bbb
func:
    - {condition: aaaaa, desc: bbbbbb}
    - {condition: aaaaa, desc: bbbbbb}
*/
#[derive(Deserialize)]
pub struct FuncConf{
    name: String,
    condition: Option<String>,
    desc: String,
}
#[derive(Deserialize)]
pub struct PreDefine{
    name: String,
    desc: String,
}
#[derive(Deserialize)]
pub struct FilterConf{
    name: String,
    desc: String,
}
#[derive(Deserialize)]
pub struct WorkNodeConf{
    name: String,
    input: Vec<String>,
    pre_define: Option<Vec<PreDefine>>,
    filter: Option<FilterConf>,
    func: Option<Vec<FuncConf>>,
}
pub struct WorkNode{
    name: String,
    input: HashMap<String, AsyncRecv>,
    filter: Vec<(AsyncSend,AsyncRecv,Option<Box<FilterInstance>>)>,
    func: Vec<(AsyncSend,AsyncRecv,Option<Box<FuncInstance>>)>,
    next_channel: (AsyncSend, AsyncRecv),
    next: HashMap<String, AsyncSend>,
    work_space: String,
    runtime: Arc<Runtime>,
    debug: Arc<WorkSpaceDebug>,
}

lazy_static!{
    static ref WORK_NODE_LIST: RwLock<HashMap<String, WorkNode>> = {
        match work_node_init_from_conf(&EventFlowNodeConfInstance.work_node){
            MapResult::Ok(t) => {t},
            MapResult::Err(e) => {
                error!("work_node_init_from_conf failed, [{:?}]", e);
                std::process::exit(-1);
            },
        }
    };
}

fn work_node_reg_with_seq(work_node_name: &str,
                          work_node_seq: &HashMap<String,&WorkNodeConf>,
                          work_node_bak: &mut HashMap<String, WorkNode>)-> MapResult<()>{

    if let Some(work_node_conf) = work_node_seq.get(work_node_name){
        for input in work_node_conf.input.iter(){
            if let Some(_) = work_node_bak.get_mut(input){
                continue;
            }
            if let Some(_) = work_node_seq.get(input){
                work_node_reg_with_seq(input, work_node_seq, work_node_bak)?;
            }
        }
        work_node_reg(work_node_conf, work_node_bak)?;
    }else{
        let err_desc = format!("work_node [{}] not exists when register", work_node_name);
        return MapResult::Err(ResultErr::new(err_desc));
    }

    return MapResult::Ok(());
}

fn work_node_dag_check(work_node_seq: &HashMap<String, &WorkNodeConf>)-> MapResult<()>{
    //DAG检测的基本思想为：从任意节点出发，都不会回到原点

    for (work_node_name,work_node_conf) in work_node_seq.iter(){
        let mut checked_seq_list = VecDeque::new();
        let mut checked_list = HashMap::new();

        work_node_dag_check_one(work_node_name, work_node_seq, &mut checked_list, &mut checked_seq_list)?;
    }

    return MapResult::Ok(());
}
fn work_node_dag_check_one(node_name: &str, work_node_seq: &HashMap<String, &WorkNodeConf>,
                           checked_list: &mut HashMap<String, ()>,
                           checked_seq_list: &mut VecDeque<String>)
                           -> MapResult<()>{
    //基于深度优先的DAG检测，同时要防止子序列存在环路导致检测过程死循环。
    if let Some(work_node) = work_node_seq.get(node_name){
        checked_seq_list.push_back(node_name.to_string());
        if let Some(_) = checked_list.insert(node_name.to_string(),()){
            //检测到环路了
            let mut err_result = ResultErr::new(format!("dag checked"));

            for t in checked_seq_list.iter(){
                err_result.append(t.to_string());
            }
            return MapResult::Err(err_result);
        }

        for pre in work_node.input.iter(){
            work_node_dag_check_one(pre, work_node_seq, checked_list, checked_seq_list)?;
            checked_list.remove(pre);
            checked_seq_list.pop_back();
        }
        return MapResult::Ok(());
    }else{
        //touch the top work_node
        return MapResult::Ok(());
    }
}

fn work_node_init_from_conf(work_node_conf: &'static Option<Vec<WorkNodeConf>>)-> MapResult<RwLock<HashMap<String,WorkNode>>>{
    let mut work_node_bak = HashMap::new();
    let mut work_node_seq = HashMap::new();

    if let Some(ref work_node_conf) = work_node_conf{

        for one_work_node_conf in work_node_conf.iter(){
            work_node_seq.insert(one_work_node_conf.name.clone(), one_work_node_conf.clone());
        }

        work_node_dag_check(&work_node_seq)?;

        for (work_node_name, work_node_conf) in work_node_seq.iter(){
            work_node_reg_with_seq(work_node_name, &work_node_seq, &mut work_node_bak)?;
        }
    }

    return MapResult::Ok(RwLock::new(work_node_bak));
}
fn work_node_reg(conf: &WorkNodeConf, node_list: &mut HashMap<String, WorkNode>)-> MapResult<()>{

    if let Some(_) = node_list.get(&conf.name){
        return MapResult::Ok(());
    }

    info!("work_node register [{}]", conf.name);

    let mut work_space = None;

    //input
    let mut input_list = HashMap::new();
    for input in conf.input.iter(){
        if let MapResult::Ok(r) = work_space_add_next(input.as_str(), conf.name.as_str()){
            if let Some(ref work_space_ref) = work_space{
                if work_space_ref != input{
                    let err_desc = format!("work_node [{}] input cannot belong to different work_space [{}] and [{}]", conf.name, input, work_space_ref);
                    return MapResult::Err(ResultErr::new(err_desc));
                }
            }else{
                work_space = Some(input.to_string());
            }

            input_list.insert(input.to_string(), r);
        }else{

            match work_node_add_next(input.as_str(), conf.name.as_str()){
                MapResult::Ok((pre_work_space,r)) => {
                    if let Some(ref work_space_ref) = work_space{
                        if work_space_ref.as_str() != pre_work_space.as_str(){
                            let err_desc = format!("work_node [{}] cannot belong to different work_space [{}] and [{}]", conf.name, pre_work_space, work_space_ref);
                            return MapResult::Err(ResultErr::new(err_desc));
                        }
                    }else{
                        work_space = Some(input.to_string());
                    }

                    input_list.insert(input.to_string(), r);
                }
                MapResult::Err(e) => {return MapResult::Err(e);},
            }
        }
    }

    if input_list.is_empty(){
        let err_desc = format!("work_node [{}] input is empty", conf.name);
        return MapResult::Err(ResultErr::new(err_desc));
    }

    let mut pre_define = HashMap::new();
    //pre_define expr
    if let Some(pre_define_expr) = &conf.pre_define{
        for t in pre_define_expr.iter(){
            let pre_expr = conf_func_map_logicalplan_expr(&t.desc, &pre_define)?;
            pre_define.insert(t.name.to_string(), pre_expr);
        }
    }

    //filter
    let mut filter_list = Vec::new();
    if let Some(ref filter) = &conf.filter{
        let filter_expr = conf_func_map_logicalplan_expr(&filter.desc, &pre_define)?;
        let filter_runplan = logicalplan_map(&filter_expr)?;
        let (s,r) = async_channel_new();
        filter_list.push((s,r,Some(Box::new(FilterInstance{func: Box::new(RunPlanInstance{name:filter.name.to_string(),run_plan:filter_runplan})}))));
    }

    //func
    let mut func_list = Vec::new();
    if let Some(ref func) = &conf.func{

        for f in func.iter(){
            let condition = if let Some(ref condition) = f.condition{
                let condition_expr = conf_func_map_logicalplan_expr(&condition, &pre_define)?;
                let condition_runplan = logicalplan_map(&condition_expr)?;
                Some(RunPlanInstance::new(&f.name, condition_runplan))
            }else{
                None
            };
            let func = conf_func_map_logicalplan_expr(&f.desc, &pre_define)?;
            let func = logicalplan_map(&func)?;

            let (s,r) = async_channel_new();

            func_list.push((s,r,Some(Box::new(FuncInstance{condition:condition, func: RunPlanInstance::new(&f.name,func)}))));
        }
    }

    let (work_space,work_space_runtime, debug) = if let Some(work_space) = work_space{
        let (work_space_runtime, debug) = work_space_ctl_get(work_space.as_str())?;
        (work_space, work_space_runtime, debug)
    }else{
        let err_desc = format!("work_node [{}] work_space is none", conf.name);
        return MapResult::Err(ResultErr::new(err_desc));
    };

    let tmp = WorkNode{
        name: conf.name.to_string(),
        input: input_list,
        filter: filter_list,
        func: func_list,
        next_channel: async_channel_new(),
        next: HashMap::new(),
        work_space: work_space,
        runtime: work_space_runtime,
        debug: debug,
    };

    node_list.insert(conf.name.to_string(), tmp);

    return MapResult::Ok(());
}

fn work_node_add_next(work_node_name: &str, next_name: &str)->MapResult<(String,AsyncRecv)>{

    if let Some(work_node) = WORK_NODE_LIST.write().unwrap().get_mut(work_node_name){
        let (s,r) = async_channel_new();

        work_node.next.insert(next_name.to_string(), s);
        return MapResult::Ok((work_node.work_space.clone(),r));
    }else{
        let err_desc = format!("work_node [{}] add next, but not exist", work_node_name);
        return MapResult::Err(ResultErr::new(err_desc));
    }
}

pub fn work_node_run()->MapResult<()>{
    /*
    每一个work_node：
        1）每一个input都启动一个异步feature，负责把数据分发给第一个filter或者第一个func或者next channel
        2）每一个filter都启动一个异步feature，负责数据的判断和发给下一个filter或者第一个func或者next channel
        3）每一个func都启动一个异步feature，负责本func的condition判断和hand处理，并把数据发给下一个func或者next channel
        4）启动一个feature负责把next channel的数据传给所有next
    */

    for (work_node_name, work_node) in WORK_NODE_LIST.write().unwrap().iter_mut(){
        //each work_node

        let mut filter_list = Vec::new();
        for (s,r,_) in work_node.filter.iter(){
            filter_list.push((s.clone(),r.clone()));
        }

        let mut func_list = Vec::new();
        for (s,r,_) in work_node.func.iter(){
            func_list.push((s.clone(),r.clone()));
        }

        //each input
        for (_input_name,input) in work_node.input.iter(){
            let work_node_name = work_node_name.to_string();
            let debug = Arc::clone(&work_node.debug);
            let input = input.clone();
            let next_send = match filter_list.first(){
                Some((s,_)) => {s.clone()},
                None => {
                    match func_list.first(){
                        Some((s,_)) => {s.clone()},
                        None => {
                            work_node.next_channel.0.clone()
                        },
                    }
                },
            };

            work_node.runtime.spawn(
                work_node_input(work_node_name, input, next_send, debug)
            );
        }

        //each filter
        for (index,(_,r,filter)) in work_node.filter.iter_mut().enumerate(){
            let work_node_name = work_node_name.to_string();
            let debug = Arc::clone(&work_node.debug);
            let filter = match filter.take(){
                Some(t) => {t},
                None => {
                    let err_desc = format!("work_node {} filter is None when run", work_node_name);
                    return MapResult::Err(ResultErr::new(err_desc));
                },
            };
            let input = r.clone();
            let next_send = if let Some((s,_)) = filter_list.get(index+1){
                s.clone()
            }else{
                work_node.next_channel.0.clone()
            };

            work_node.runtime.spawn(
                work_node_filter(work_node_name, input, filter, next_send, debug)
            );
        }

        //each func
        for (index,(_,r,func)) in work_node.func.iter_mut().enumerate(){
            let work_node_name = work_node_name.to_string();
            let debug = Arc::clone(&work_node.debug);
            let func = match func.take(){
                Some(t) => {t},
                None => {
                    let err_desc = format!("work_node {} func is None when run", work_node_name);
                    return MapResult::Err(ResultErr::new(err_desc));
                },
            };
            let input = r.clone();
            let next_send = if let Some((s,_)) = func_list.get(index+1){
                s.clone()
            }else{
                work_node.next_channel.0.clone()
            };

            work_node.runtime.spawn(
                work_node_func(work_node_name, input, func, next_send, debug)
            );
        }

        //next
        let next = work_node.next.clone();
        let input = work_node.next_channel.1.clone();

        work_node.runtime.spawn(
            work_node_next(input, next)
        );
    }

    return MapResult::Ok(());
}
async fn work_node_func(work_node_name: String, input: AsyncRecv, mut func: Box<FuncInstance>, next: AsyncSend, debug: Arc<WorkSpaceDebug>){
    loop{
        if let Some(mut data) = input.recv().await{
            if let Some(ref mut condition) = func.condition{
                //这里做一个约定，只要有condition，那么只能是condition正确执行，并且结果是bool true的时候，才能执行
                let condition_ret = condition.evaluate(&mut data).await;
                if let HandResult::Ok(condition_ret) = condition_ret{
                    if let RealValue::Boolean(condition_ret) = condition_ret.value{
                        if condition_ret{
                            match func.func.evaluate(&mut data).await{
                                HandResult::Ok(_) => {},
                                HandResult::Err(mut e) => {},
                            }
                        }
                    }
                }
            }else{
                match func.func.evaluate(&mut data).await{
                    HandResult::Ok(_) => {},
                    HandResult::Err(mut e) => {},
                }
            }

            next.send(data).await;
        }
    }
}
async fn work_node_next(input: AsyncRecv, next: HashMap<String, AsyncSend>){
    loop{
        if let Some(data) = input.recv().await{
            for (_,next) in next.iter(){
                next.send(data.clone()).await;
            }
        }
    }
}
async fn work_node_filter(work_node_name: String, input: AsyncRecv, mut filter: Box<FilterInstance>, next: AsyncSend, debug: Arc<WorkSpaceDebug>){
    loop{
        if let Some(mut data) = input.recv().await{
            match filter.func.evaluate(&mut data).await{
                HandResult::Ok(t) => {
                    if let RealValue::Boolean(f) = t.value{
                        if f{
                            next.send(data).await;
                        }
                    }
                },
                HandResult::Err(e) => {},
            }
        }
    }
}
async fn work_node_input(work_node_name: String, input: AsyncRecv, next: AsyncSend, debug: Arc<WorkSpaceDebug>){
    loop{
        if let Some(mut data) = input.recv().await{
            if let Some(d) = debug.event_flow_path{
                if d{
                    data.metadata.append_event_flow_path(&work_node_name);
                }
            }
            next.send(data).await;
        }
    }
}
